package main

import (
	"dash/collectors"
	"dash/utils/httplog"
	"fmt"
	"github.com/Shopify/sarama"
	"github.com/go-kit/kit/log"
	"github.com/go-playground/validator/v10"
	"gopkg.in/yaml.v3"
	"io/ioutil"
	"net/http"
	"os"
	"os/signal"
	"syscall"
)

func newProducer(kafkaDSN string) sarama.AsyncProducer {
	config := sarama.NewConfig()
	config.Producer.MaxMessageBytes = 1e7
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Retry.Max = 10
	config.Producer.Return.Successes = true
	producer, err := sarama.NewAsyncProducer([]string{kafkaDSN}, config)
	if err != nil {
		panic(err)
	}
	return producer
}

var cfg struct {
	Port                        int    `yaml:"port"`
	KafkaDSN                    string `yaml:"kafkaDSN"`
	TextMessageTopic            string `yaml:"textMessageTopic"`
	OtherMessageTopic           string `yaml:"otherMessageTopic"`
	CollectorJoinChatroomTopic  string `yaml:"collectorJoinChatroomTopic"`
	CollectorLeaveChatroomTopic string `yaml:"collectorLeaveChatroomTopic"`
	MemberJoinChatroomTopic     string `yaml:"memberJoinChatroomTopic"`
	MemberLeaveChatroomTopic    string `yaml:"memberLeaveChatroomTopic"`
	ChatroomMemberTopic         string `yaml:"chatroomMemberTopic"`
	LogTopic                    string `yaml:"logTopic"`
	HeartBeatTopic              string `yaml:"heartBeatTopic"`
	UploadDir                   string `yaml:"uploadDir"`
}

func init() {
	b, err := ioutil.ReadFile("config.yaml")
	if err != nil {
		panic(err)
	}
	if err := yaml.Unmarshal(b, &cfg); err != nil {
		panic(err)
	}
}

func main() {
	var logger log.Logger
	{
		logger = log.NewJSONLogger(log.NewSyncWriter(os.Stderr))
		logger = log.With(
			logger,
			"caller", log.DefaultCaller,
			"ts", log.DefaultTimestampUTC,
			"svc", "collector",
		)
	}

	producer := newProducer(cfg.KafkaDSN)
	defer func() {
		err := producer.Close()
		if err != nil {
			_ = logger.Log("err", err)
		}
	}()
	go func() {
		for err := range producer.Errors() {
			_ = logger.Log("err", err)
		}
	}()
	go func() {
		for success := range producer.Successes() {
			_ = logger.Log("topic", success.Topic, "offset", success.Offset)
		}
	}()
	repo := collectors.NewKafkaCollectorReporterRepository(logger, producer, collectors.NewKafkaCollectorReporterRepositoryOpts{
		TextMessageTopic:            cfg.TextMessageTopic,
		OtherMessageTopic:           cfg.OtherMessageTopic,
		CollectorJoinChatroomTopic:  cfg.CollectorJoinChatroomTopic,
		CollectorLeaveChatroomTopic: cfg.CollectorLeaveChatroomTopic,
		MemberJoinChatroomTopic:     cfg.MemberJoinChatroomTopic,
		MemberLeaveChatroomTopic:    cfg.MemberLeaveChatroomTopic,
		ChatroomMemberTopic:         cfg.ChatroomMemberTopic,
		LogTopic:                    cfg.LogTopic,
		HeartBeatTopic:              cfg.HeartBeatTopic,
	})

	svc := collectors.NewCollectorService(logger, repo, cfg.UploadDir)

	validate := validator.New()

	var handler http.Handler
	{
		handler = collectors.NewHandler(logger, validate, svc)
		handler = httplog.HttpLog(logger, handler)
	}

	errs := make(chan error)
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		errs <- fmt.Errorf("%s", <-signals)
	}()
	go func() {
		errs <- http.ListenAndServe(fmt.Sprintf(":%d", cfg.Port), handler)
	}()
	_ = logger.Log("exit", (<-errs).Error())
}
